<!DOCTYPE HTML>
<html>
<head>
    <meta http-equiv="content-type" content="text/html; charset=utf-8"/>
    <meta name="google-site-verification" content="KEatQX-J4dYY-6J2KU_aP5X8gAJ8wS0lhylI8umX6WA" />
    <meta name="viewport" content="width=device-width,initial-scale=1,minimal-ui">
    <link rel="shortcut icon" href="../images/favicon.ico">
    <link rel="stylesheet" href="../css/code.css" type="text/css"/>
    <link rel="stylesheet" href="../css/bootstrap.css" type="text/css"/>
    <link rel="stylesheet" href="../css/main.css" type="text/css"/>
    <title>编程小梦|Apache Doris Colocate Join 原理与实践</title>
</head>
<body>
<nav class="navbar navbar-default navbar-static-top" style="opacity: .9" role="navigation">
    <div class="container-fluid">
        <div class="navbar-header">
            <button type="button" class="navbar-toggle" data-toggle="collapse"
                    data-target="#bs-example-navbar-collapse-1">
                <span class="sr-only">Toggle navigation</span>
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
            </button>
            <a class="navbar-brand" href="/">编程小梦</a>
        </div>
        <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
            <ul class="nav navbar-nav navbar-right">
                <li class="active"><a href="/">Blog</a></li>
                
                <li><a href="https://github.com/kangkaisen" target="_blank" rel="nofollow">GitHub</a></li>
                
                
                <li><a href="http://weibo.com/533234148" target="_blank" rel="nofollow">WeiBo</a></li>
                
            </ul>
        </div>
    </div>
</nav>
<div class="row" style="padding-top: 60px">
    <div class="container center-block">
        <div class="col-md-1"></div>
        <div class="col-md-10 col-sm-12">
            <h1> Apache Doris Colocate Join 原理与实践</h1>
            <hr/>
            <p>作者: 康凯森</p>
            <p>日期: 2019-01-06</p>
            <p>分类: <a href="../tag/OLAP.html" target="_blank" >OLAP</a></p>
            <hr/>
            <!-- toc -->
<ul>
<li><a href="#what-colocate-join">What Colocate Join</a></li>
<li><a href="#why-colocate-join">Why Colocate Join</a></li>
<li><a href="#how-colocate-join">How Colocate Join</a><ul>
<li><a href="#核心思路">核心思路</a></li>
<li><a href="#术语定义">术语定义</a></li>
<li><a href="#1-数据导入时保证本地性">1 数据导入时保证本地性</a></li>
<li><a href="#2-colocate-join-query-plan">2 Colocate Join Query Plan</a></li>
<li><a href="#3-colocate-join-query-schedule">3 Colocate Join Query Schedule</a></li>
<li><a href="#4-colocate-join-at-bucket-seq-level">4 Colocate Join At Bucket Seq Level</a></li>
<li><a href="#5-colocate-join-metadata-maintenance">5 Colocate Join Metadata Maintenance</a></li>
<li><a href="#6-how-to-decide-a-query-can-colocate-join">6 How to decide a query can colocate join</a></li>
<li><a href="#7-colocate-join-support-balance">7 Colocate Join Support Balance</a></li>
</ul>
</li>
<li><a href="#colocate-join-performance">Colocate Join Performance</a></li>
<li><a href="#how-to-use-colocate-join">How To Use Colocate Join</a><ul>
<li><a href="#colocate-join-目前限制">Colocate Join 目前限制</a></li>
<li><a href="#colocate-join-适用场景">Colocate Join 适用场景</a></li>
<li><a href="#colocate-join-faq">Colocate Join FAQ</a></li>
</ul>
</li>
<li><a href="#总结">总结</a></li>
</ul>
<!-- toc stop -->
<p>Colocate Join 是我贡献给 Apache Doris 社区的第一个大 Feature，在我们生产环境已经稳定运行 3 个多月，上线效果明显。 本文总结下 Apache Doris Colocate Join 原理与实践。</p>
<h2 id="what-colocate-join">What Colocate Join</h2>
<p>我们都知道 Join 的常见连接类型分为以下几种：</p>
<ul>
<li>INNER JOIN</li>
<li>OUTER JOIN</li>
<li>CROSS JOIN</li>
<li>SEMI JOIN</li>
<li>ANTI JOIN</li>
</ul>
<p>Join 的常见算法实现包含以下几种：</p>
<ul>
<li>Nested Loop Join</li>
<li>Sort Merge Join</li>
<li>Hash Join</li>
</ul>
<p>分布式系统实现 Join 数据分布的常见策略有：</p>
<ul>
<li>Shuffle Join</li>
<li>Broadcast Join</li>
<li>Colocate/Local Join</li>
</ul>
<p>Colocate/Local Join 就是指多个节点 Join 时没有数据移动和网络传输，每个节点只在本地进行 Join，能够本地进行 Join 的前提是相同 Join Key 的数据分布在相同的节点。</p>
<h2 id="why-colocate-join">Why Colocate Join</h2>
<p>相比 Shuffle Join 和 Broadcast Join,Colocate Join 在查询时没有数据的网络传输，性能会更高。 在 Doris 的具体实现中，Colocate Join 相比 Shuffle Join 可以拥有更高的并发粒度，也可以显著提升 Join 的性能，这一点在后面会解释。</p>
<h2 id="how-colocate-join">How Colocate Join</h2>
<h3 id="核心思路">核心思路</h3>
<p><strong>对于 colocate tables，在任何情况下都要保证数据的本地性。</strong> 具体包括：</p>
<ul>
<li>数据导入时保证数据本地性</li>
<li>查询调度时保证数据本地性</li>
<li>数据 balance 后保证数据本地性</li>
</ul>
<p>实现中最复杂是第 3 点: 处理 colocate tables 的 balance。</p>
<h3 id="术语定义">术语定义</h3>
<p><strong>Colocate Group</strong></p>
<p>我们将一组具体相同 Colocate 属性的 Table 称为 Group，下图中 t1 和 t2 拥有相同的 Colocate Group。</p>
<p><strong>Colocate Parent Table</strong></p>
<p>我们将决定一个 Group 数据分布的 Table 称为 Parent Table，下图中 t1 是 Colocate Parent Table.</p>
<p><strong>Colocate Child Table</strong> </p>
<p>我们将一个 Group 中除 Parent Table 之外的 Table 称为 Child Table，下图中 t2 是 Colocate Child Table.</p>
<p><img src="media/15467658860806/15467681739579.png" alt="Doris Colocate Join"></p>
<p><strong>Bucket Seq</strong></p>
<p>如下图，如果一个表有 N 个 Partition, 则每个 Partition 的第 M 个 bucket 的 Bucket Seq 是 M。</p>
<p><img src="media/15467658860806/15467686993734.png" alt="Doris Bucket Seq"></p>
<h3 id="1-数据导入时保证本地性">1 数据导入时保证本地性</h3>
<p>Doris 的分区方式如下所示，先根据分区字段 Range 分区，再根据指定的 Distributed Key Hash 分桶：</p>
<p><img src="media/15467658860806/Doris-buckets.png" alt="Doris-buckets"></p>
<p>所以我们在数据导入时保证本地性的核心思想就是<strong>两次映射</strong>，对于 colocate tables，我们保证相同 Distributed Key 的数据映射到相同的 Bucket Seq，再保证相同 Bucket Seq 的 buckets 映射到相同的 BE。</p>
<p><img src="media/15467658860806/15467694968691.png" alt="Doris Colocate Join"></p>
<p>具体来说，第一步：我们计算 Distributed Key 的 hash 值，并对 bucket num 取模，保证相同 Distributed Key 的数据映射到相同的 Bucket Seq。</p>
<p><img src="media/15467658860806/15467695936182.png" alt="Doris Colocate Join"></p>
<p>第二步：将同一个 Colocate Group 下所有相同 Bucket Seq 的 Bucket 映射到相同的 BE，方法如下：</p>
<ol>
<li>Group 中所有 Table 的 Bucket Seq 和 BE 节点的映射关系和 Parent Table 一致</li>
<li>Parent Table 中所有 Partition 的 Bucket Seq 和 BE 节点的映射关系和第一个 Partition 一致</li>
<li>Parent Table 第一个 Partition 的 Bucket Seq 和 BE 节点的映射关系利用原生的 Round Robin 算法决定 </li>
</ol>
<p><img src="media/15467658860806/15467696882342.png" alt="Doris Colocate Join"></p>
<h3 id="2-colocate-join-query-plan">2 Colocate Join Query Plan</h3>
<p>对 HashJoinFragment，由于 Join 的多张表有了数据本地性保证，所以可以去掉 Exchange Node，避免网络传输，将 ScanNode 直接设置为 Hash Join Node 的 Child。</p>
<p><img src="media/15467658860806/15467699813002.png" alt="Doris Colocate Join"></p>
<h3 id="3-colocate-join-query-schedule">3 Colocate Join Query Schedule</h3>
<p>查询调度的目标： 一个 Colocate join 中所有 ScanNode 中所有 Bucket Seq 相同的 Buckets 被调度到同一个 BE。</p>
<p>查询调度的策略：第一个 ScanNode 的 Buckets 随机选择 BE，其余的 ScanNode 和第一个 ScanNode 保持一致。</p>
<h3 id="4-colocate-join-at-bucket-seq-level">4 Colocate Join At Bucket Seq Level</h3>
<p>目前，Doris 的 Hash Join 是 Server 粒度的：</p>
<p><img src="media/15467658860806/15467755170205.png" alt="Doris Hash Join"></p>
<p>对于 colocate join，由于同一个 Colocate Group 下相同 Bucket Seq 的 Bucket 分布在相同的 BE，所以我们将 Join 的粒度从 Server 粒度降至 Bucket Seq 粒度：</p>
<p><img src="media/15467658860806/15467756497745.png" alt="Doris colocate join"></p>
<h3 id="5-colocate-join-metadata-maintenance">5 Colocate Join Metadata Maintenance</h3>
<p>对于 colocate join，我们需要维护以下几个核心元数据：</p>
<p><img src="media/15467658860806/15467757881945.png" alt="Doris Colocate Join Metadata"></p>
<ul>
<li>代码中，colocate group id 就是 colocate parent table id</li>
<li>group2BackendsPerBucketSeq 代表每个 colocate group 中每个 bucket seq 映射到哪些 BE</li>
<li>为了支持 balance，以及保证元数据的一致性，这些元数据都需要持久化</li>
</ul>
<h3 id="6-how-to-decide-a-query-can-colocate-join">6 How to decide a query can colocate join</h3>
<ol>
<li>Join 的 tables 是 colocate able</li>
<li>The colocate group 是 stable 状态，没有 balancing</li>
<li>Join 的 Key 包含分桶的 Distributed Key</li>
</ol>
<h3 id="7-colocate-join-support-balance">7 Colocate Join Support Balance</h3>
<p><strong>核心思路</strong>：</p>
<p>新增一个 daemon 线程专门处理 colocate table 的 balance，并让正常的 balance 线程不处理 colocate table 的 balance。</p>
<p><strong>何时 balance</strong>： </p>
<p>有 BE 节点新增，删除，down 掉时。</p>
<p><strong>balance 的粒度</strong>： </p>
<p>正常 balance 的粒度是 bucket，但是对于 colocate table，我们必须保证同一个 colocate group 下所有 bucket 的数据本地性，所以我们 balance 的单位是 colocate group。</p>
<p><strong>balance 对查询的影响</strong>：</p>
<p>当一个 colocate group 正在 balance 时，colocate join 会退化为原始的 shuffle join 或 broadcast join。</p>
<p><strong>balance 流程：</strong></p>
<ol>
<li>为需要复制或迁移的 Bucket 选择目标 BE</li>
<li>标记 colocate group 的转态为 balancing</li>
<li>对于需要复制或迁移的 Bucket，发起 Clone Job，Clone Job 会从 Bucket 的现有副本复制一个新副本目标 BE</li>
<li>更新 backendsPerBucketSeq（维护 Bucket Seq 到 BE 映射关系的元数据）</li>
<li>当一个 colocate group 下的所有 Clone Job 都完成时，标记 colocate group 的转态为 stable</li>
<li>删除冗余的副本</li>
</ol>
<p>当有 BE 节点删除或长时间挂掉时，选择目标 BE 的策略：</p>
<p>和正常 balance 时的选择策略相同，考虑集群的整体负载，尽量选择负载较低的 BE。</p>
<p>当有 BE 节点新增时，选择目标 BE 的策略：</p>
<ol>
<li>对于当前 colocate group，计算每个新增 BE 需要增加的 bucket seqs 个数：假如我们有 3 个 BE，8 个 bucket，每个 bucket 是 3 副本，则每个 BE 负责 8 个 bucket 副本，我们新增 1 个 BE 后，可以计算出每个 BE 负责的平均 bucket 副本数应该是 3 * 8 / 4 = 6，每个新增 BE 需要增加的 bucket seqs 个数为 6 / 1 = 6.</li>
<li>对于每个 bucket seqs, 随机选择从哪个旧的 BE 迁移副本到新增的 BE。</li>
</ol>
<h2 id="colocate-join-performance">Colocate Join Performance</h2>
<p><strong>测试数据：</strong></p>
<p>Table A,B,C 都有 10 天数据，1 天一个 partitions，每个 partition 有 570 万数据。</p>
<p><strong>测试集群：</strong></p>
<p>4 台低配物理机，每个 BE 24CPU，96MEM</p>
<p><strong>测试 SQL:</strong></p>
<p>SQL1: </p>
<pre><code>select count(*)  
FROM A t1
INNER JOIN [shuffle] B t5
   ON ((t1.dt = t5.dt) AND (t1.id = t5.id))
INNER JOIN [shuffle] C t6
   ON ((t1.dt = t6.dt) AND (t1.id = t6.id))
where t1.dt in (xxx days);
</code></pre><p>SQL2: </p>
<pre><code>select t1.dt, t1.id, t1.name, t1.second_id,t1.second_name,
t5.id, t5.weight_time,t5.list,
t6.ord_id, t6._id
FROM A t1
INNER JOIN B t5
   ON ((t1.dt = t5.dt) AND (t1.id = t5.id))
INNER JOIN C t6
   ON ((t1.dt = t6.dt) AND (t1.id = t6.id))
where t1.dt in (xxx days)
limit 10000;
</code></pre><p><strong>Test Result for SQL1:</strong></p>
<p><img src="media/15467658860806/15467799035787.png" alt="Doris Colocate Join Performance"></p>
<p><strong>Test Result for SQL2:</strong></p>
<p><img src="media/15467658860806/15467799236266.png" alt="Doris Colocate Join Performance"></p>
<p>可以看到，Colocate Join 相比 Shuffle Join 有明显的性能提升，<strong>而且随着集群规模越大，Join 的数据量越多，Colocate Join 的优势会更明显。</strong></p>
<h2 id="how-to-use-colocate-join">How To Use Colocate Join</h2>
<p>社区最新代码已经支持 Colocate Join，只不过默认是关闭的，只需要在 FE 配置中设置 disable_colocate_join 为 false，即可开启 Colocate Join 功能。</p>
<p>具体使用时只需要在建表时增加 colocate_with 这个属性即可，colocate_with 的值可以设置成同一组 colocate 表中的任意一个，不过需要保证 colocate_with 属性中的表要先建立。</p>
<p>假如需要对 table t1 和 t2 进行 Colocate Join，可以按以下语句建表：</p>
<pre><code>CREATE TABLE `t1` (
  `id` int(11) COMMENT &quot;&quot;,
  `value` varchar(8) COMMENT &quot;&quot;
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
&quot;colocate_with&quot; = &quot;t1&quot;
);


CREATE TABLE `t2` (
  `id` int(11) COMMENT &quot;&quot;,
  `value` varchar(8) COMMENT &quot;&quot;
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
&quot;colocate_with&quot; = &quot;t1&quot;
);
</code></pre><h3 id="colocate-join-目前限制">Colocate Join 目前限制</h3>
<ol>
<li>Colocate Table 必须是 OLAP 类型的表</li>
<li>colocate_with 属性相同表的 BUCKET 数必须一样</li>
<li>colocate_with 属性相同表的 副本数必须一样 （这个限制之后可能会去掉，但对用户应该没有实际影响）</li>
<li>colocate_with 属性相同表的 DISTRIBUTED Columns 的数据类型必须一样</li>
</ol>
<h3 id="colocate-join-适用场景">Colocate Join 适用场景</h3>
<p>Colocate Join 十分适合几张表按照相同字段分桶，并高频根据相同字段 Join 的场景，比如电商的不少应用都按照商家 Id 分桶，并高频按照商家 Id 进行 Join。</p>
<h3 id="colocate-join-faq">Colocate Join FAQ</h3>
<p>一句话总结，<strong>凡是不能进行 Colocate Join 的场景都会自动退化为原始的 Shuffle Join 或者 Broadcast Join</strong>。</p>
<p>Q1: 支持多张表进行 Colocate Join 吗?</p>
<p>A: 支持</p>
<p>Q2: 支持 Colocate 表和正常表 Join 吗？</p>
<p>A: 支持</p>
<p>Q3: Colocate 表支持用非分桶的 Key 进行 Join 吗？</p>
<p>A: 支持：不符合 Colocate Join 条件的 Join 会使用 Shuffle Join 或 Broadcast Join</p>
<p>Q4: 如何确定 Join 是按照 Colocate Join 执行的？</p>
<p>A: explain 的结果中 Hash Join 的孩子节点如果直接是 OlapScanNode， 没有 Exchange Node，就说明是 Colocate Join</p>
<p>Q5: 如何修改 colocate_with 属性？</p>
<p>A: ALTER TABLE example_db.my_table set (&quot;colocate_with&quot;=&quot;target_table&quot;);</p>
<p>Q6: 如何禁用 colocate join?</p>
<p>A: set disable_colocate_join = true; 就可以禁用 Colocate Join，查询时就会使用 Shuffle Join 或 Broadcast Join</p>
<h2 id="总结">总结</h2>
<p>大多数支持 Join 的 OLAP 系统都会考虑支持 Colocate Join，比如 MemSQL, SnappyData, 阿里 AnalyticDB 等，阿里 AnalyticDB 更是在数据模型中就引入了 Table Group 的概念。总的来讲，Colocate Join 通过在数据导入，查询 Plan，查询调度，数据 balance 时对数据本地性的保证和考虑，可以显著加速特定场景的下 Join 查询，是一个十分有用的 Feature。</p>

            <hr/>
            <div style="padding: 0; margin: 10px auto; width: 90%; text-align: center">
                <button id="rewardButton" , disable="enable" ,
                        onclick="var qr = document.getElementById('QR'); if (qr.style.display === 'none') {qr.style.display='block';} else {qr.style.display='none'}"
                        ,
                        style="cursor: pointer; border: 0; outline: 0; border-radius: 100%; padding: 0; margin: 0; letter-spacing: normal; text-transform: none; text-indent: 0px; text-shadow: none">
                    <span style="display: inline-block; width: 60px; height: 60px; border-radius: 100%; line-height: 58px; color: #fff; font-size:36px; font-family: 'Palatino Linotype', 'Book Antiqua', Palatino, Helvetica, STKaiti, SimSun, serif; background: rgb(236,96,0)">赞</span>
                </button>
                <div id="QR" style="display: none;">
                    <p><img src="../images/weixin.jpeg" width="200" /></p>
                    <p><img src="../images/zhifubao.jpeg" width="200" /></p>
                </div>

            </div>
            <h3>评论</h3>
            <div id="vcomment"></div>
        </div>
        <div class="col-md-1"></div>
    </div>
</div>

<div class="row" style="padding-top: 60px">
    <div class="container center-block">
        <div class="col-md-1"></div>
        <div class="col-md-10 col-sm-12">
            <div class="ds-thread"
                 data-thread-key=5c3200674b5a7a0a656cbe0e
                 data-title=Apache Doris Colocate Join 原理与实践
                 data-url=doris-colocate-join>
            </div>
        </div>
        <div class="col-md-1"></div>
    </div>
</div>

<div class="footer">
    <a href="https://www.bcmeng.com/" target="_blank"  rel="nofollow">康凯森</a>
</div>

<script src="../js/code.js"></script>
<script>hljs.initHighlightingOnLoad();</script>
<script src="../js/jquery.min.js"></script>
<script src="../js/bootstrap.js"></script>
<script>
    var _hmt = _hmt || [];
    (function() {
        var hm = document.createElement("script");
        hm.src = "https://hm.baidu.com/hm.js?1d198a377ef466190881d1c021155925";
        var s = document.getElementsByTagName("script")[0];
        s.parentNode.insertBefore(hm, s);
    })();
</script>
<script src="../js/av-min.js"></script>
<script src='../js/Valine.min.js'></script>
<script type="text/javascript">
    window.valine = new Valine({
        el: '#vcomment' ,
        verify: true,
        notify: true,
        appId: 'BlLnB0re5OzQVzrgEplAxkyg-gzGzoHsz',
        appKey: 'wUyxSV0U4Vi7oK1EHK6ipErv',
        placeholder: '欢迎评论'
    });
</script>

</body>
</html>